package com.zhoujing.rabbltmq.three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zhoujing.rabbltmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 周敬
 * @version 1.0
 * @createTime 2022/7/9-17:56-星期六
 * <p>
 * 消息在手动应答时不丢失，放回队列中重新消费
 */
public class Work03 {
    /**
     * 队列名称
     */
    public static final String TASK_QUEUE_NAME = "ack_queue";


    /**
     * 消费者接收消息
     *
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");

        // 传递回调
        DeliverCallback deliverCallback = (tag,message)->{
            // 让线程睡眠15秒
            try {
                Thread.sleep(15*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到的消息为："+new String(message.getBody(),"UTF-8"));

            // 手动应答
            // basicAck(消息标识，是否批量应答)
            // 消息标识：用于每个消息，是否批量应答：批量应答会导致消息丢失
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        // 取消回调
        CancelCallback cancelCallback = (tag)->{
            System.out.println(tag + "消费者取消回调");
        };
        channel.basicQos(5);
        boolean autoAck = false;
        // basicConsume(队列名称，是否自动应答，回调，取消回调)
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
